package org.codehaus.activemq.store.jdbc.adapter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.jms.JMSException;
import org.codehaus.activemq.store.jdbc.StatementProvider;

public class BlobJDBCAdapter extends DefaultJDBCAdapter
{
  public BlobJDBCAdapter()
  {
  }

  public BlobJDBCAdapter(StatementProvider provider)
  {
    super(provider);
  }

  public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, JMSException
  {
    PreparedStatement s = null;
    ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getAddMessageStatment());
      s.setLong(1, seq);
      s.setString(2, destinationName);
      s.setString(3, messageID);
      s.setString(4, " ");

      if (s.executeUpdate() != 1) {
        throw new JMSException("Failed to broker message: " + messageID + " in container.");
      }
      s.close();

      s = c.prepareStatement(this.statementProvider.getFindMessageStatment());
      s.setLong(1, seq);
      rs = s.executeQuery();
      if (!rs.next()) {
        throw new JMSException("Failed to broker message: " + messageID + " in container.");
      }

      Blob blob = rs.getBlob(1);
      OutputStream stream = blob.setBinaryStream(data.length);
      stream.write(data);
      stream.close();
      s.close();

      s = c.prepareStatement(this.statementProvider.getUpdateMessageStatment());
      s.setBlob(1, blob);
      s.setLong(2, seq);
    }
    catch (IOException e) {
      throw ((SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e));
    }
    finally {
      try {
        rs.close();
      } catch (Throwable e) {
      }
      try {
        s.close();
      } catch (Throwable e) {
      }
    }
  }

  public byte[] doGetMessage(Connection c, long seq) throws SQLException {
    PreparedStatement s = null; ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getFindMessageStatment());
      s.setLong(1, seq);
      rs = s.executeQuery();

      if (!rs.next()) {
       // Object localObject1 = null;
        return null;
      }
      Blob blob = rs.getBlob(1);
      InputStream is = blob.getBinaryStream();

      ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
      int ch;
      while ((ch = is.read()) >= 0) {
        os.write(ch);
      }
      is.close();
      os.close();

      byte[] arrayOfByte = os.toByteArray();
      return arrayOfByte;
    }
    catch (IOException e)
    {
      throw ((SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e));
    } finally {
      try {
        rs.close(); } catch (Throwable e) {
      }try { s.close(); } catch (Throwable e) {  }
    }//throw localObject2;
  }
}